package com.atguigu.app.dim;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimSinkFunction;
import com.atguigu.app.func.DimTableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


//数据流:web/app -> Mysql -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix(DIM)
//程 序:Mock -> Mysql -> Maxwell -> Kafka(ZK) -> DimApp(FlinkCDC) -> Phoenix(HBase ZK/HDFS)
public class DimApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.setRestartStrategy(RestartStrategies.failureRateRestart(
//                10, Time.of(1L, TimeUnit.DAYS), Time.of(3L, TimeUnit.MINUTES)
//        ));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall/ck");
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取Kafka ODS层topic_db主题数据创建流
        KafkaSource<String> kafkaSource = MyKafkaUtil.getKafkaSource("topic_db", "dim_app_221109");
        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        //TODO 3.判断数据是否为JSON格式,如果不是则输出到侧输出流,同时转换为JSON对象   主流
//        kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
//            @Override
//            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
//                if (value != null) {
//                    try {
//                        JSONObject jsonObject = JSONObject.parseObject(value);
//                        out.collect(jsonObject);
//                    } catch (Exception e) {
//                        throw new RuntimeException(e);
//                    }
//                }
//            }
//        });
        OutputTag<String> outputTag = new OutputTag<String>("Dirty") {
        };
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                if (value != null) {
                    try {
                        JSONObject jsonObject = JSONObject.parseObject(value);
                        out.collect(jsonObject);
                    } catch (Exception e) {
                        ctx.output(outputTag, value);
                    }
                }
            }
        });
        jsonObjDS.getSideOutput(outputTag).print("Dirty>>>>>>");

        jsonObjDS.print("jsonObjDS>>>>>>>>");

        //TODO 4.使用FlinkCDC读取MySQL中的配置信息创建配置流
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop103")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-221109-config")
                .tableList("gmall-221109-config.table_process")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");

        //TODO 5.将配置流转换为广播流
        MapStateDescriptor<String, TableProcess> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = mysqlDS.broadcast(stateDescriptor);

        //TODO 6.连接主流与广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);

        //TODO 7.处理,根据广播状态内容过滤主流数据
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectedStream.process(new DimTableProcessFunction(stateDescriptor));

        //TODO 8.将数据写出到Phoenix
        hbaseDS.print("hbaseDS>>>>>>>>");
        hbaseDS.addSink(new DimSinkFunction());

        //TODO 9.启动任务
        env.execute("DimApp");

    }
}
